home *** CD-ROM | disk | FTP | other *** search
/ Language/OS - Multiplatform Resource Library / LANGUAGE OS.iso / et / et3_0-a1.lha / et3 / src / StreamConnection.C < prev    next >
C/C++ Source or Header  |  1992-08-26  |  9KB  |  456 lines

  1. #ifdef __GNUG__
  2. #pragma implementation
  3. #endif
  4.  
  5. #include "StreamConnection.h"
  6.  
  7. #include "Class.h"
  8. #include "Error.h"
  9. #include "String.h"
  10. #include "MemBuf.h"
  11. #include "OrdColl.h"
  12. #include "CLib.h"
  13. #include "Math.h"
  14. #include "Env.h"
  15.  
  16. const int cHeaderSize= 20;
  17.  
  18. bool gIACDebug;
  19. extern "C" void system_nonblock(int fd);
  20.  
  21. //---- Message -----------------------------------------------------------------
  22.  
  23. block::block()
  24. {
  25.     len= cHeaderSize;
  26.     pos= 0;
  27.     buf= 0;
  28. }
  29.  
  30. block::~block()
  31. {
  32.     SafeDelete(buf);
  33. }
  34.  
  35. Message::Message()
  36. {
  37.     to= from= 0;
  38.     serial= 0;
  39. }
  40.     
  41. Message::Message(int t, int f, int reply, char *buf, int l, bool makecopy)
  42. {
  43.     to= t;
  44.     from= f;
  45.     serial= reply;
  46.     if (l < 0)
  47.     l= strlen(buf)+1;
  48.     data[1].len= l;
  49.     if (makecopy) {
  50.     data[1].buf= new char[l];
  51.     strncpy(data[1].buf, buf, l);
  52.     } else
  53.     data[1].buf= buf;
  54. }
  55.  
  56. int Message::Read(int fd)
  57. {
  58.     for (int i= 0; i < 2; i++) {
  59.     block *b= &data[i];
  60.     
  61.     if (b->pos < b->len) {
  62.         if (b->buf == 0)
  63.         b->buf= new char[b->len];
  64.         int cnt= CLib::Read(fd, b->buf+b->pos, b->len-b->pos);
  65.  
  66.         if (cnt == 0 || cnt == -1)
  67.         return -1;
  68.         if (cnt == -2)
  69.         return -2;
  70.     
  71.         b->pos+= cnt;
  72.         
  73.         if (b->pos == b->len) {
  74.         if (i == 1) {
  75.             return 0;
  76.         }
  77.         if (sscanf(b->buf, "%d %d %d %d",
  78.                     &to, &from, &data[1].len, &serial) != 4)
  79.             return -1;
  80.         }
  81.     }
  82.     }
  83.     return 0;
  84. }
  85.  
  86. int Message::Write(int fd)
  87. {
  88.     for (int i= 0; i < 2; i++) {
  89.     block *b= &data[i];
  90.     
  91.     while (b->pos < b->len) {
  92.         int wl= CLib::Write(fd, b->buf+b->pos, b->len-b->pos);
  93.         if (wl == -1)   // fatal error
  94.         return -1;
  95.         if (wl == -2)   // try again
  96.         return -2;
  97.         b->pos+= wl;
  98.     }
  99.     }
  100.     Print("Write", 0);
  101.     return 0;                   // completely written
  102. }
  103.  
  104. void Message::Print(char *msg, char*)
  105. {
  106.     if (gIACDebug) {
  107.     int l= data[1].len;
  108.     
  109.     fprintf(stderr, "%s fr:%d to:%d ser:%d l:%d <", msg, from, to, serial, l);
  110.     if (l < 60) {
  111.         
  112.         fprintf(stderr, "%s>\n", data[1].buf);
  113.     } else {
  114.         fwrite(data[1].buf, 1, 60, stderr);
  115.         fprintf(stderr, "...>\n");
  116.     }
  117.     }
  118. }
  119.  
  120. //---- StreamConnection --------------------------------------------------------
  121.  
  122. NewMetaImpl0(StreamConnection, SysEvtHandler); 
  123.  
  124. StreamConnection::StreamConnection(int f) : SysEvtHandler(f)
  125. {
  126.     gIACDebug= Env::GetValue("IAC.Debug", FALSE);
  127.     serial= 100;
  128.     wqueue= 0;
  129.     queue= 0;
  130.     rm= 0;
  131.     sid= 0;
  132.     name= 0;
  133.     //SetName(gProgname);
  134.     error= !Open(f);
  135.     gSystem->AddWorkHandler(this);
  136. }
  137.  
  138. StreamConnection::~StreamConnection()
  139. {
  140.     error= TRUE;
  141.     CLib::Close(GetResourceId());
  142.     SafeDelete(name);
  143. }
  144.  
  145. void StreamConnection::SetName(char *n)
  146. {
  147.     SafeDelete(name);
  148.     name= strsave(n);
  149. }
  150.     
  151. bool StreamConnection::Open(int fd)
  152. {
  153.     if (fd > 0) {
  154.     SetResourceId(fd);
  155.     system_nonblock(fd);
  156.     SetResourceId(fd);
  157.     gSystem->AddFileInputHandler(this);
  158.     gSystem->AddFileOutputHandler(this);
  159.     error= FALSE;
  160.     return TRUE;
  161.     }
  162.     error= TRUE;
  163.     return FALSE;
  164. }
  165.  
  166. int StreamConnection::Error(int code)
  167. {
  168.     error= TRUE;
  169.     fprintf(stderr, "StreamConnection::Error: disconnecting from server; reason: %d\n", code);
  170.     return 0;
  171. }
  172.  
  173. bool StreamConnection::Reconnect()
  174. {
  175.     /*
  176.     if (error) {
  177.     int sock= gSystem->OpenConnection(SERVERNAME, SERVICENAME);
  178.     if (sock >= 0)
  179.         error= !Open(sock);
  180.     }
  181.     */
  182.     return error;
  183. }
  184.  
  185. void StreamConnection::SendMessage(Message *m)
  186. {
  187.     if (m) {
  188.     SafeDelete(m->data[0].buf);
  189.     m->data[0].len= cHeaderSize;
  190.     m->data[0].pos= 0;
  191.     m->data[0].buf= new char[cHeaderSize];
  192.     m->data[1].pos= 0;
  193.     sprintf(m->data[0].buf, "%d %d %d %d", m->to, m->from, m->data[1].len, m->serial);
  194.     if (wqueue == 0)
  195.         wqueue= new OrdCollection;
  196.     wqueue->Add(m);
  197.     }
  198. }
  199.  
  200. int StreamConnection::DrainOutput(int timeout)
  201. {
  202.     if (Reconnect())
  203.     return -1;
  204.     // wait for output to drain
  205.     while (wqueue) {
  206.     if (gSystem->CanWrite(GetResourceId(), timeout)) {
  207.         fprintf(stderr, "DrainOutput: timeout\n");
  208.         return -1;
  209.     }
  210.     Notify(eSysEvtWrite, GetResourceId());
  211.     }
  212.     return 0;
  213. }
  214.  
  215. int StreamConnection::WaitForMessage(int timeout)
  216. {
  217.     if (Reconnect())
  218.     return -1;
  219.     if (gSystem->CanRead(GetResourceId(), timeout))
  220.     return -1;
  221.     Notify(eSysEvtRead, GetResourceId());
  222.     return 0;
  223. }
  224.  
  225. Message *StreamConnection::WaitForReply(int seq, int timeout)
  226. {
  227.     if (Reconnect())
  228.     return 0;
  229.     for (;;) {
  230.     if (queue && queue->Size() > 0) {
  231.         register Message *m;
  232.         Iter next(queue);
  233.         
  234.         while (m= (Message*) next()) {
  235.         if (m->IsReply()) {    // a reply
  236.             int s= m->serial;
  237.             queue->RemovePtr(m);
  238.             if (s == -seq) {
  239.             return m;
  240.             } else {
  241.             delete m;
  242.             fprintf(stderr, "lost sync\n");
  243.             return 0;
  244.             }
  245.         }
  246.         }
  247.     }
  248.     
  249.     DrainOutput(4000);
  250.  
  251.     if (WaitForMessage(timeout) == -1) {
  252.         fprintf(stderr, "CanRead: timeout\n");
  253.         return 0;
  254.     }
  255.     }
  256.  
  257.     return 0;
  258. }
  259.  
  260. bool StreamConnection::HasInterest(SysEventCodes code)
  261. {
  262.     if (error)
  263.     return FALSE;
  264.     if (SysEvtHandler::HasInterest(code) == FALSE)
  265.     return FALSE;
  266.     if (code == eSysEvtWrite)
  267.     return wqueue != 0;
  268.     return TRUE;
  269. }
  270.  
  271. void StreamConnection::Notify(SysEventCodes sec, int)
  272. {
  273.     if (error)
  274.     return;
  275.     
  276.     switch (sec) {
  277.     case eSysEvtWork:
  278.     if (queue && queue->Size() > 0) {
  279.         Message *m= (Message*) queue->RemoveFirst();
  280.         if (m)
  281.         Dispatch0(m);
  282.     }
  283.     break;
  284.     
  285.     case eSysEvtRead:
  286.     if (rm == 0)
  287.         rm= new Message;
  288.         
  289.     switch (rm->Read(GetResourceId())) {
  290.     case 0:     // got one
  291.         if (queue == 0)
  292.         queue= new OrdCollection;
  293.         if (rm->from == 0)
  294.         rm->from= sid;
  295.         rm->Print("Received", 0);
  296.         queue->Add(rm);
  297.         rm= 0;
  298.         break;
  299.     case -1:    // fatal error
  300.         Error(2);
  301.         break;
  302.     case -2:    // try again later
  303.         break;
  304.     }
  305.     break;
  306.  
  307.     case eSysEvtWrite:
  308.     if (wqueue && wqueue->Size() > 0) {
  309.         Message *m= (Message*) wqueue->First();
  310.         if (m) {
  311.         switch (m->Write(GetResourceId())) {
  312.         case 0:     // completely written
  313.             if ((Message*)wqueue->RemovePtr(m) == m) {
  314.             SafeDelete(m);
  315.             if (wqueue->Size() <= 0)
  316.                 SafeDelete(wqueue);
  317.             } else  // inconsistent queue
  318.             Error(5);
  319.             break;
  320.         case -1:    // fatal error
  321.             Error(4);
  322.             break;
  323.         case -2:    // try again
  324.             break;
  325.         }
  326.         }
  327.     }
  328.     break;
  329.     }
  330. }
  331.  
  332. void StreamConnection::Dispatch0(Message *m)
  333. {
  334.     char req[200], *bp= req, *buf= m->data[1].buf, *cp, *retbuf= 0;
  335.     int retlen= -1, len= m->data[1].len;
  336.  
  337.     cp= buf;
  338.     while (*cp && *cp != ' ')
  339.     *bp++= *cp++;
  340.     *bp= 0;
  341.     if (*cp == ' ')
  342.     cp++;
  343.     len-= (cp-buf);
  344.     
  345.     bool rc= Dispatch2(req, cp, len, retbuf, retlen, m);
  346.     if (retbuf)
  347.     IntSendTo(m->from, -m->serial, 0, retbuf, retlen);
  348.  
  349.     if (rc)
  350.     delete m;
  351. }
  352.  
  353. bool StreamConnection::Dispatch2(char *req, char *buf,
  354.                 int len, char *&retbuf, int &retlen, Message *m)
  355. {
  356.     if (!m->IsReply())      // not a reply
  357.     Dispatch(m->from, req, buf, len, retbuf, retlen);
  358.     else
  359.     fprintf(stderr, "StreamConnection::Dispatch2: oops: got a reply!!\n");
  360.     return TRUE;
  361. }
  362.  
  363. //---- old interface -----------------------------------------------------------
  364.  
  365. int StreamConnection::SendTo(int to, char *req, char *buf, int l)
  366. {
  367.     return IntSendTo(to, serial++, req, buf, l);
  368. }
  369.  
  370. int StreamConnection::Talk(int to, char *req, char *buf, int len, 
  371.                           char **retbuf, int *retlen)
  372. {
  373.     if (Reconnect())
  374.     return -1;
  375.  
  376.     int seq= serial++;
  377.  
  378.     if (!IntSendTo(to, seq, req, buf, len))
  379.     return FALSE;
  380.  
  381.     Message *m= WaitForReply(seq, 20000);
  382.     
  383.     if (m) {
  384.     if (retbuf) {
  385.         *retbuf= m->data[1].buf;
  386.         m->data[1].buf= 0;
  387.     }
  388.     if (retlen)
  389.         *retlen= m->data[1].len;
  390.     delete m;
  391.     }
  392.     
  393.     return TRUE;
  394. }
  395.  
  396. void StreamConnection::Talk(char *prog, char *va_(fmt), ...)
  397. {
  398.     va_list ap;
  399.  
  400.     char *rbuf= 0;
  401.     int to;
  402.     va_start(ap, va_(fmt));
  403.  
  404.     Talk(1, "start", prog, -1, &rbuf);
  405.     
  406.     if (rbuf && sscanf(rbuf, "%d", &to) == 1) {
  407.     char buf[2000];
  408.     strcpy(buf, prog);
  409.     strcat(buf, ":");
  410.     vsprintf(&buf[strlen(buf)], va_(fmt), ap);
  411.     fprintf(stderr, "[%s]\n", buf);
  412.     Talk(to, buf);
  413.     }
  414.  
  415.     va_end(ap);
  416. }
  417.  
  418. int StreamConnection::IntSendTo(int to, int reply, char *req, char *buf, int l)
  419. {
  420.     if (Reconnect())
  421.     return FALSE;
  422.     
  423.     int rl= 0;
  424.     
  425.     if (req)
  426.     rl= strlen(req);
  427.     
  428.     if (buf) {
  429.     if (l < 0)
  430.         l= strlen(buf);
  431.     } else
  432.     l= 0;
  433.     
  434.     int tl= l + rl + 1;
  435.     if (l > 0 && rl > 0)
  436.     tl++;
  437.     
  438.     char *bp= new char[tl];
  439.     
  440.     if (rl > 0)
  441.     strcpy(bp, req);
  442.     if (l > 0 && rl > 0)
  443.     strcat(bp, " ");
  444.     if (l > 0)
  445.     strcat(bp, buf);
  446.     
  447.     SendMessage(new Message(to, 0, reply, bp, tl, FALSE));
  448.  
  449.     return TRUE;
  450. }
  451.  
  452. void StreamConnection::Dispatch(int, char*, char*, int, char *&, int&)
  453. {
  454. }
  455.  
  456.